##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2025, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################

"""
Implementation of Connection.
It is a wrapper around the actual psycopg3 driver, and connection
object.
"""

import os
import secrets
import datetime
import asyncio
import copy
from collections import deque
import psycopg
from flask import g, current_app
from flask_babel import gettext
from flask_security import current_user
from pgadmin.utils.crypto import decrypt
from psycopg._encodings import py_codecs as encodings

import config
from pgadmin.model import User
from pgadmin.utils.exception import ConnectionLost, CryptKeyMissing
from pgadmin.utils import get_complete_file_path
from ..abstract import BaseConnection
from .cursor import DictCursor, AsyncDictCursor, AsyncDictServerCursor
from .typecast import register_global_typecasters,\
    register_string_typecasters, register_binary_typecasters, \
    register_array_to_string_typecasters, ALL_JSON_TYPES
from .encoding import get_encoding, configure_driver_encodings
from pgadmin.utils import csv_lib as csv
from pgadmin.utils.master_password import get_crypt_key
from io import StringIO
from pgadmin.utils.locker import ConnectionLocker
from pgadmin.utils.driver import get_driver


# On Windows, Psycopg is not compatible with the default ProactorEventLoop.
# So, setting to SelectorEventLoop.
if os.name == 'nt':
    asyncio.set_event_loop_policy(
        asyncio.WindowsSelectorEventLoopPolicy()
    )

_ = gettext

# Register global type caster which will be applicable to all connections.
register_global_typecasters()
configure_driver_encodings(encodings)


class Connection(BaseConnection):
    """
    class Connection(object)

        A wrapper class, which wraps the psycopg3 connection object, and
        delegate the execution to the actual connection object, when required.

    Methods:
    -------
    * connect(**kwargs)
      - Connect the PostgreSQL/EDB Postgres Advanced Server using the psycopg3
      driver

    * execute_scalar(query, params, formatted_exception_msg)
      - Execute the given query and returns single datum result

    * execute_async(query, params, formatted_exception_msg)
      - Execute the given query asynchronously and returns result.

    * execute_void(query, params, formatted_exception_msg)
      - Execute the given query with no result.

    * execute_2darray(query, params, formatted_exception_msg)
      - Execute the given query and returns the result as a 2 dimensional
        array.

    * execute_dict(query, params, formatted_exception_msg)
      - Execute the given query and returns the result as an array of dict
        (column name -> value) format.

    * connected()
      - Get the status of the connection.
        Returns True if connected, otherwise False.

    * reset()
      - Reconnect the database server (if possible)

    * transaction_status()
      - Transaction Status

    * ping()
      - Ping the server.

    * _release()
      - Release the connection object of psycopg3

    * _reconnect()
      - Attempt to reconnect to the database

    * _wait(conn)
      - This method is used to wait for asynchronous connection. This is a
        blocking call.

    * _wait_timeout(conn)
      - This method is used to wait for asynchronous connection with timeout.
        This is a non blocking call.

    * poll(formatted_exception_msg)
      - This method is used to poll the data of query running on asynchronous
        connection.

    * status_message()
      - Returns the status message returned by the last command executed on
      the server.

    * rows_affected()
      - Returns the no of rows affected by the last command executed on
      the server.

    * cancel_transaction(conn_id, did=None)
      - This method is used to cancel the transaction for the
        specified connection id and database id.

    * messages()
      - Returns the list of messages/notices sends from the PostgreSQL database
        server.

    * _formatted_exception_msg(exception_obj, formatted_msg)
      - This method is used to parse the psycopg.Error object and returns the
        formatted error message if flag is set to true else return
        normal error message.

    * check_notifies(required_polling)
      - Check for the notify messages by polling the connection or after
        execute is there in notifies.

    * get_notifies()
      - This function will returns list of notifies received from database
        server.

    * pq_encrypt_password_conn()
      - This function will return the encrypted password for database server
      - greater than or equal to 10.
    """
    UNAUTHORIZED_REQUEST = gettext("Unauthorized request.")
    CURSOR_NOT_FOUND = \
        gettext("Cursor could not be found for the async connection.")
    ARGS_STR = "{0}#{1}"

    def __init__(self, manager, conn_id, db, **kwargs):
        assert (manager is not None)
        assert (conn_id is not None)

        auto_reconnect = kwargs.get('auto_reconnect', True)
        async_ = kwargs.get('async_', 0)
        use_binary_placeholder = kwargs.get('use_binary_placeholder', False)
        array_to_string = kwargs.get('array_to_string', False)

        self.conn_id = conn_id
        self.manager = manager
        self.db = db if db is not None else manager.db
        self.conn = None
        self.auto_reconnect = auto_reconnect
        self.async_ = async_
        self.__async_cursor = None
        self.__async_query_id = None
        self.__async_query_error = None
        self.__backend_pid = None
        self.execution_aborted = False
        self.row_count = 0
        self.__notices = None
        self.__notifies = None
        self.password = None
        # This flag indicates the connection status (connected/disconnected).
        self.wasConnected = False
        # This flag indicates the connection reconnecting status.
        self.reconnecting = False
        self.use_binary_placeholder = use_binary_placeholder
        self.array_to_string = array_to_string
        self.qtLiteral = get_driver(config.PG_DEFAULT_DRIVER).qtLiteral
        self._autocommit = True

        super(Connection, self).__init__()

    def as_dict(self):
        """
        Returns the dictionary object representing this object.
        """
        # In case, it cannot be auto reconnectable, or already been released,
        # then we will return None.
        if not self.auto_reconnect and not self.conn:
            return None

        res = dict()
        res['conn_id'] = self.conn_id
        res['database'] = self.db
        res['async_'] = self.async_
        res['wasConnected'] = self.wasConnected
        res['auto_reconnect'] = self.auto_reconnect
        res['use_binary_placeholder'] = self.use_binary_placeholder
        res['array_to_string'] = self.array_to_string

        return res

    def __repr__(self):
        return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format(
            self.conn_id, self.db,
            'Connected' if self.conn and not self.conn.closed else
            "Disconnected",
            self.async_
        )

    def __str__(self):
        return self.__repr__()

    def _check_user_password(self, kwargs):
        """
        Check user and password.
        """
        password = None
        encpass = None
        is_update_password = True

        if 'user' in kwargs and kwargs['password']:
            password = kwargs['password']
            kwargs.pop('password')
            is_update_password = False
        else:
            if 'encpass' in kwargs:
                encpass = kwargs['encpass']
            else:
                encpass = kwargs['password'] if 'password' in kwargs else None

        return password, encpass, is_update_password

    def _decode_password(self, encpass, manager, password, crypt_key):
        if encpass:
            # Fetch Logged in User Details.
            user = User.query.filter_by(id=current_user.id).first()

            if user is None:
                return True, self.UNAUTHORIZED_REQUEST, password

            try:
                password = decrypt(encpass, crypt_key)
                # password is in bytes, for python3 we need it in string
                if isinstance(password, bytes):
                    password = password.decode()
            except Exception as e:
                manager.stop_ssh_tunnel()
                current_app.logger.exception(e)
                return True, \
                    _(
                        "Failed to decrypt the saved password.\nError: {0}"
                    ).format(str(e)), password
        return False, '', password

    def connect(self, **kwargs):
        if self.conn:
            if self.conn.closed:
                self.conn = None
            else:
                return True, None

        manager = self.manager

        crypt_key_present, crypt_key = get_crypt_key()
        if not crypt_key_present:
            raise CryptKeyMissing()
        password, encpass, is_update_password = \
            self._check_user_password(kwargs)

        passfile = kwargs['passfile'] if 'passfile' in kwargs else None
        tunnel_password = kwargs['tunnel_password'] if 'tunnel_password' in \
                                                       kwargs else ''

        # Check SSH Tunnel needs to be created
        if manager.use_ssh_tunnel == 1 and not manager.tunnel_created:
            status, error = manager.create_ssh_tunnel(tunnel_password)
            if not status:
                return False, error

        # Check SSH Tunnel is alive or not.
        if manager.use_ssh_tunnel == 1:
            manager.check_ssh_tunnel_alive()

        if is_update_password:
            if encpass is None:
                encpass = self.password or getattr(manager, 'password', None)

            self.password = encpass

        # Reset the existing connection password
        if self.reconnecting is not False:
            self.password = None

        if not crypt_key_present:
            raise CryptKeyMissing()

        is_error, errmsg, password = self._decode_password(
            encpass, manager, password, crypt_key)
        if is_error:
            return False, errmsg

        # If no password credential is found then connect request might
        # come from Query tool, ViewData grid, debugger etc tools.
        # we will check for pgpass file availability from connection manager
        # if it's present then we will use it
        if not password and not encpass and not passfile:
            passfile = manager.get_connection_param_value('passfile')
            if manager.passexec:
                password = manager.passexec.get()

        try:
            database = self.db
            if 'user' in kwargs and kwargs['user']:
                user = kwargs['user']
            else:
                user = manager.user
            conn_id = self.conn_id

            import os
            os.environ['PGAPPNAME'] = '{0} - {1}'.format(
                config.APP_NAME, conn_id)

            ssl_key = get_complete_file_path(
                manager.get_connection_param_value('sslkey'))
            sslmode = manager.get_connection_param_value('sslmode')
            if ssl_key and sslmode in \
                    ['require', 'verify-ca', 'verify-full']:
                ssl_key_file_permission = \
                    int(oct(os.stat(ssl_key).st_mode)[-3:])
                if ssl_key_file_permission > 600:
                    os.chmod(ssl_key, 0o600)

            with ConnectionLocker(manager.kerberos_conn):
                # Create the connection string
                connection_string = manager.create_connection_string(
                    database, user, password)

                if self.async_:
                    autocommit = True
                    if 'auto_commit' in kwargs:
                        autocommit = kwargs['auto_commit']

                    async def connectdbserver():
                        return await psycopg.AsyncConnection.connect(
                            connection_string,
                            cursor_factory=AsyncDictCursor,
                            autocommit=autocommit,
                            prepare_threshold=manager.prepare_threshold
                        )
                    pg_conn = asyncio.run(connectdbserver())
                    pg_conn.server_cursor_factory = AsyncDictServerCursor
                else:
                    pg_conn = psycopg.Connection.connect(
                        connection_string,
                        cursor_factory=DictCursor,
                        prepare_threshold=manager.prepare_threshold)

        except psycopg.Error as e:
            manager.stop_ssh_tunnel()
            if hasattr(e, 'pgerror'):
                msg = e.pgerror
            elif e.diag.message_detail:
                msg = e.diag.message_detail
            else:
                msg = str(e)
            current_app.logger.info(
                "Failed to connect to the database server(#{server_id}) for "
                "connection ({conn_id}) with error message as below"
                ":{msg}".format(
                    server_id=self.manager.sid,
                    conn_id=conn_id,
                    msg=msg
                )
            )
            return False, msg

        # Overwrite connection notice attr to support
        # more than 50 notices at a time
        pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH)
        pg_conn.add_notify_handler(self.check_notifies)
        pg_conn.add_notice_handler(self.get_notices)

        self.conn = pg_conn
        self.wasConnected = True
        try:
            status, msg = self._initialize(conn_id, **kwargs)
        except Exception as e:
            manager.stop_ssh_tunnel()
            current_app.logger.exception(e)
            self.conn = None
            if not self.reconnecting:
                self.wasConnected = False
            raise e

        if status and is_update_password:
            manager._update_password(encpass)
        else:
            if not self.reconnecting and is_update_password:
                self.wasConnected = False

        return status, msg

    def _set_auto_commit(self, kwargs):
        """
        autocommit flag does not work with asynchronous connections.
        By default asynchronous connection runs in autocommit mode.
        :param kwargs:
        :return:
        """
        if self.async_ == 0:
            if 'autocommit' in kwargs and kwargs['autocommit'] is False:
                self.conn.autocommit = False
            else:
                self.conn.autocommit = True

    def _set_role(self, manager, cur, conn_id, **kwargs):
        """
        Set role
        :param manager:
        :param cur:
        :param conn_id:
        :return:
        """
        is_set_role = False
        role = None
        status = None

        if 'role' in kwargs and kwargs['role']:
            is_set_role = True
            role = kwargs['role']
        elif manager.role:
            is_set_role = True
            role = manager.role

        if is_set_role:
            _query = "SELECT rolname from pg_roles WHERE rolname = {0}" \
                     "".format(self.qtLiteral(role, self.conn))
            _status, res = self.execute_scalar(_query)

            if res:
                status = self._execute(cur, "SET ROLE TO {0}".format(
                    self.qtLiteral(role, self.conn)))
            else:
                # If role is not found then set the status to role
                # for showing the proper error message
                status = role

            if status is not None:
                self.conn.close()
                self.conn = None
                current_app.logger.error(
                    "Connect to the database server (#{server_id}) for "
                    "connection ({conn_id}), but - failed to setup the role "
                    " {msg}".format(
                        server_id=self.manager.sid,
                        conn_id=conn_id,
                        msg=status
                    )
                )
                return True, \
                    _(
                        "Failed to setup the role \n{0}"
                    ).format(status)
        return False, ''

    def _execute(self, cur, query, params=None):
        formatted_exception_msg = self._formatted_exception_msg
        try:
            self.__internal_blocking_execute(cur, query, params)
        except psycopg.Error as pe:
            cur.close_cursor()
            return formatted_exception_msg(pe, False)
        return None

    def _initialize(self, conn_id, **kwargs):
        self.execution_aborted = False
        self.__backend_pid = self.conn.info.backend_pid

        setattr(g, self.ARGS_STR.format(
            self.manager.sid,
            self.conn_id.encode('utf-8')
        ), None)

        register_string_typecasters(self.conn)

        manager = self.manager

        # autocommit flag does not work with asynchronous connections.
        # By default asynchronous connection runs in autocommit mode.
        self._set_auto_commit(kwargs)

        if self.array_to_string:
            register_array_to_string_typecasters(self.conn)

        # Register type casters for binary data only after registering array to
        # string type casters.
        if self.use_binary_placeholder:
            register_binary_typecasters(self.conn)
        #
        postgres_encoding, self.python_encoding = \
            get_encoding(self.conn.info.encoding)

        status, cur = self.__cursor()

        # Note that we use 'UPDATE pg_settings' for setting bytea_output as a
        # convenience hack for those running on old, unsupported versions of
        # PostgreSQL 'cos we're nice like that.
        status = self._execute(
            cur,
            "SET DateStyle=ISO; "
            "SET client_min_messages=notice; "
            "SELECT set_config('bytea_output','hex',false)"
            " FROM pg_show_all_settings()"
            " WHERE name = 'bytea_output'; "
            "SET client_encoding='{0}';".format(postgres_encoding)
        )

        if status is not None:
            self.conn.close()
            self.conn = None

            return False, status

        is_error, errmsg = self._set_role(manager, cur, conn_id, **kwargs)
        if is_error:
            return False, errmsg

        # Check database version every time on reconnection
        status = self._execute(cur, "SELECT version()")

        if status is not None:
            self.conn.close()
            self.conn = None
            self.wasConnected = False
            current_app.logger.error(
                "Failed to fetch the version information on the "
                "established connection to the database server "
                "(#{server_id}) for '{conn_id}' with below error "
                "message:{msg}".format(
                    server_id=self.manager.sid,
                    conn_id=conn_id,
                    msg=status)
            )
            return False, status

        if cur.rowcount > 0:
            row = cur.fetchmany(1)[0]
            manager.ver = row['version']
            manager.sversion = self.conn.info.server_version

        status = self._execute(cur, """
SELECT
    db.oid as did, db.datname, db.datallowconn,
    pg_encoding_to_char(db.encoding) AS serverencoding,
    has_database_privilege(db.oid, 'CREATE') as cancreate,
    datistemplate
FROM
    pg_catalog.pg_database db
WHERE db.datname = current_database()""")

        if status is None:
            manager.db_info = manager.db_info or dict()
            if cur.rowcount > 0:
                res = cur.fetchmany(1)[0]
                manager.db_info[res['did']] = res.copy()

                # We do not have database oid for the maintenance database.
                if len(manager.db_info) == 1:
                    manager.did = res['did']

                if manager.sversion >= 120000:
                    status = self._execute(cur, """
        SELECT
             gss_authenticated, encrypted
        FROM
            pg_catalog.pg_stat_gssapi
        WHERE pid = pg_backend_pid()""")
                    if status is None and cur.get_rowcount() > 0:
                        res_enc = cur.fetchmany(1)[0]
                        manager.db_info[res['did']]['gss_authenticated'] =\
                            res_enc['gss_authenticated']
                        manager.db_info[res['did']]['gss_encrypted'] = \
                            res_enc['encrypted']

                        if len(manager.db_info) == 1:
                            manager.gss_authenticated = \
                                res_enc['gss_authenticated']
                            manager.gss_encrypted = res_enc['encrypted']

        self._set_user_info(cur, manager, **kwargs)

        self._set_server_type_and_password(kwargs, manager)

        ret_msg = self.execute_post_connection_sql(cur, manager)

        manager.update_session()

        return True, ret_msg

    def _set_user_info(self, cur, manager, **kwargs):
        """
        Set user info.
        :param cur:
        :param manager:
        :return:
        """
        status = self._execute(cur, """
        SELECT
            roles.oid as id, roles.rolname as name,
            roles.rolsuper as is_superuser,
            CASE WHEN roles.rolsuper THEN true ELSE roles.rolcreaterole END as
            can_create_role,
            CASE WHEN roles.rolsuper THEN true
            ELSE roles.rolcreatedb END as can_create_db,
            CASE WHEN 'pg_signal_backend'=ANY(ARRAY(WITH RECURSIVE cte AS (
            SELECT pg_roles.oid,pg_roles.rolname FROM pg_roles
                WHERE pg_roles.oid = roles.oid
            UNION ALL
            SELECT m.roleid,pgr.rolname FROM cte cte_1
                JOIN pg_auth_members m ON m.member = cte_1.oid
                JOIN pg_roles pgr ON pgr.oid = m.roleid)
            SELECT rolname  FROM cte)) THEN True
            ELSE False END as can_signal_backend
        FROM
            pg_catalog.pg_roles as roles
        WHERE
            rolname = current_user""")

        if status is None and 'user' not in kwargs:
            manager.user_info = dict()
            if cur.get_rowcount() > 0:
                manager.user_info = cur.fetchmany(1)[0]

    def _set_server_type_and_password(self, kwargs, manager):
        """
        Set server type
        :param kwargs:
        :param manager:
        :return:
        """
        if 'password' in kwargs:
            manager.password = kwargs['password']

        server_types = None
        if 'server_types' in kwargs and isinstance(
                kwargs['server_types'], list):
            server_types = manager.server_types = kwargs['server_types']

        if server_types is None:
            from pgadmin.browser.server_groups.servers.types import ServerType
            server_types = ServerType.types()

        for st in server_types:
            if st.stype == 'ppas':
                if st.instance_of(manager.ver):
                    manager.server_type = st.stype
                    manager.server_cls = st
                    break
            else:
                if st.instance_of():
                    manager.server_type = st.stype
                    manager.server_cls = st
                    break

    def execute_post_connection_sql(self, cur, manager):
        # Execute post connection SQL if provided in the server dialog
        errmsg = None
        if manager.post_connection_sql and manager.post_connection_sql != '':
            status = self._execute(cur, manager.post_connection_sql)
            if status is not None:
                errmsg = gettext(("Failed to execute the post connection SQL "
                                  "with below error message:\n{msg}").format(
                    msg=status))
                current_app.logger.error(errmsg)
        return errmsg

    def __cursor(self, server_cursor=False, scrollable=False):

        if not get_crypt_key()[0] and config.SERVER_MODE:
            raise CryptKeyMissing()

        # Check SSH Tunnel is alive or not. If used by the database
        # server for the connection.
        if self.manager.use_ssh_tunnel == 1:
            self.manager.check_ssh_tunnel_alive()

        if self.wasConnected is False:
            raise ConnectionLost(
                self.manager.sid,
                self.db,
                None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
            )
        cur = getattr(g, self.ARGS_STR.format(
            self.manager.sid,
            self.conn_id.encode('utf-8')
        ), None)

        if self.connected() and cur and not cur.closed:
            if not server_cursor or (
                    server_cursor and type(cur) is AsyncDictServerCursor):
                return True, cur

        if not self.connected():
            errmsg = ""

            current_app.logger.warning(
                "Connection to database server (#{server_id}) for the "
                "connection - '{conn_id}' has been lost.".format(
                    server_id=self.manager.sid,
                    conn_id=self.conn_id
                )
            )

            if self.auto_reconnect and not self.reconnecting:
                self.__attempt_execution_reconnect(None)
            else:
                raise ConnectionLost(
                    self.manager.sid,
                    self.db,
                    None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
                )

        try:
            if server_cursor:
                # Providing name to cursor will create server side cursor.
                cursor_name = "CURSOR:{0}".format(self.conn_id)
                self.conn.server_cursor_factory = AsyncDictServerCursor
                cur = self.conn.cursor(
                    name=cursor_name,
                    scrollable=scrollable
                )
            else:
                cur = self.conn.cursor(scrollable=scrollable)
        except psycopg.Error as pe:
            current_app.logger.exception(pe)
            errmsg = gettext(
                "Failed to create cursor for psycopg3 connection with error "
                "message for the server#{1}:{2}:\n{0}"
            ).format(
                str(pe), self.manager.sid, self.db
            )

            current_app.logger.error(errmsg)
            if self.conn.closed:
                self.conn = None
                if self.auto_reconnect and not self.reconnecting:
                    current_app.logger.info(
                        gettext(
                            "Attempting to reconnect to the database server "
                            "(#{server_id}) for the connection - '{conn_id}'."
                        ).format(
                            server_id=self.manager.sid,
                            conn_id=self.conn_id
                        )
                    )
                    return self.__attempt_execution_reconnect(
                        self.__cursor, server_cursor
                    )
                else:
                    raise ConnectionLost(
                        self.manager.sid,
                        self.db,
                        None if self.conn_id[0:3] == 'DB:'
                        else self.conn_id[5:]
                    )

        setattr(
            g, self.ARGS_STR.format(
                self.manager.sid, self.conn_id.encode('utf-8')
            ), cur
        )

        return True, cur

    def reset_cursor_at(self, position):
        """
        This function is used to reset the cursor at the given position
        """
        cur = self.__async_cursor
        if not cur:
            current_app.logger.log(
                25,
                'Cursor not found in reset_cursor_at method')

        try:
            cur.scroll(position, mode='absolute')
        except psycopg.Error:
            # bypassing the error as cursor tried to scroll on the
            # specified position, but end of records found
            current_app.logger.log(
                25,
                'Failed to reset cursor in reset_cursor_at method')
        except IndexError as e:
            current_app.logger.log(
                25,
                'Psycopg3 Cursor: {0}'.format(str(e)))

    def __internal_blocking_execute(self, cur, query, params):
        """
        This function executes the query using cursor's execute function,
        but in case of asynchronous connection we need to wait for the
        transaction to be completed. If self.async_ is 1 then it is a
        blocking call.

        Args:
            cur: Cursor object
            query: SQL query to run.
            params: Extra parameters
        """

        query = query.encode(self.python_encoding)
        cur.execute(query, params)

    def execute_on_server_as_csv(self, records=2000):
        """
        To fetch query result and generate CSV output

        Args:
            params: Additional parameters
            records: Number of initial records
        Returns:
            Generator response
        """
        cur = self.__async_cursor
        if not cur:
            return False, self.CURSOR_NOT_FOUND

        if self.conn.pgconn.connect_poll() != 3:
            return False, gettext(
                "Asynchronous query execution/operation underway."
            )

        encoding = self.python_encoding
        query = None
        try:
            query = str(cur.query, encoding) \
                if cur and cur.query is not None else None
        except Exception:
            current_app.logger.warning('Error encoding query with {0}'.format(
                encoding))

        current_app.logger.log(
            25,
            "Execute (with server cursor) by {pga_user} on "
            "{db_user}@{db_host}/{db_name} #{server_id} -  "
            "{conn_id} (Query-id: {query_id}):\n{query}".format(
                pga_user=current_user.email,
                db_user=self.conn.info.user,
                db_host=self.conn.info.host,
                db_name=self.conn.info.dbname,
                server_id=self.manager.sid,
                conn_id=self.conn_id,
                query=query,
                query_id=self.__async_query_id
            )
        )

        # http://initd.org/psycopg/docs/cursor.html#cursor.description
        # to avoid no-op
        if cur.description is None:
            return False, \
                gettext('The query executed did not return any data.')

        def handle_null_values(results, replace_nulls_with):
            """
            This function is used to replace null values with the given string

            :param results:
            :param replace_nulls_with: null values will be replaced by this
            string.
            :return: modified result
            """

            temp_results = []
            for row in results:
                res = dict()
                for k, v in row.items():
                    if v is None:
                        res[k] = replace_nulls_with
                    else:
                        res[k] = v
                temp_results.append(res)
            results = temp_results

            return results

        def gen(conn_obj, trans_obj, quote='strings', quote_char="'",
                field_separator=',', replace_nulls_with=None):

            try:
                cur.scroll(0, mode='absolute')
            except Exception as e:
                print(str(e))
            results = cur.fetchmany(records)
            if not results:
                yield gettext('The query executed did not return any data.')
                return

            header = []
            json_columns = []

            for c in cur.ordered_description():
                # This is to handle the case in which column name is non-ascii
                column_name = c.to_dict()['name']
                header.append(column_name)
                if c.to_dict()['type_code'] in ALL_JSON_TYPES:
                    json_columns.append(column_name)

            res_io = StringIO()

            if quote == 'strings':
                quote = csv.QUOTE_NONNUMERIC
            elif quote == 'all':
                quote = csv.QUOTE_ALL
            else:
                quote = csv.QUOTE_NONE

            csv_writer = csv.DictWriter(
                res_io, fieldnames=header, delimiter=field_separator,
                quoting=quote,
                quotechar=quote_char,
                replace_nulls_with=replace_nulls_with
            )

            csv_writer.writeheader()
            # Replace the null values with given string if configured.
            if replace_nulls_with is not None:
                results = handle_null_values(results, replace_nulls_with)
            csv_writer.writerows(results)

            yield res_io.getvalue()

            while True:
                results = cur.fetchmany(records)

                if not results:
                    break
                res_io = StringIO()

                csv_writer = csv.DictWriter(
                    res_io, fieldnames=header, delimiter=field_separator,
                    quoting=quote,
                    quotechar=quote_char,
                    replace_nulls_with=replace_nulls_with
                )

                # Replace the null values with given string if configured.
                if replace_nulls_with is not None:
                    results = handle_null_values(results, replace_nulls_with)
                csv_writer.writerows(results)
                yield res_io.getvalue()

            try:
                # try to reset the cursor scroll back to where it was,
                # bypass error, if cannot scroll back
                rows_fetched_from = trans_obj.get_fetched_row_cnt()
                cur.scroll(rows_fetched_from, mode='absolute')
            except psycopg.Error:
                # bypassing the error as cursor tried to scroll on the
                # specified position, but end of records found
                pass
            except Exception:
                pass

        # Registering back type caster for large size data types to string
        # which was unregistered at starting
        register_string_typecasters(self.conn)
        return True, gen, self

    def execute_scalar(self, query, params=None,
                       formatted_exception_msg=False):
        status, cur = self.__cursor()
        self.row_count = 0

        if not status:
            return False, str(cur)
        query_id = str(secrets.choice(range(1, 9999999)))

        current_app.logger.log(
            25,
            "Execute (scalar) by {pga_user} on "
            "{db_user}@{db_host}/{db_name} #{server_id} - "
            "{conn_id} (Query-id: {query_id}):\n{query}".format(
                pga_user=current_user.email,
                db_user=self.conn.info.user,
                db_host=self.conn.info.host,
                db_name=self.conn.info.dbname,
                server_id=self.manager.sid,
                conn_id=self.conn_id,
                query=query,
                query_id=query_id
            )
        )
        try:
            self.__internal_blocking_execute(cur, query, params)
        except psycopg.Error as pe:
            cur.close_cursor()
            if not self.connected():
                if self.auto_reconnect and not self.reconnecting:
                    return self.__attempt_execution_reconnect(
                        self.execute_scalar, query, params,
                        formatted_exception_msg
                    )
                raise ConnectionLost(
                    self.manager.sid,
                    self.db,
                    None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
                )
            errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
            current_app.logger.error(
                "Failed to execute query (execute_scalar) for the server "
                "#{server_id} - {conn_id} (Query-id: {query_id}):\n"
                "Error Message:{errmsg}".format(
                    server_id=self.manager.sid,
                    conn_id=self.conn_id,
                    errmsg=errmsg,
                    query_id=query_id
                )
            )
            return False, errmsg
        except Exception:
            print("EXCEPTION.....")

        # If multiple queries are run, make sure to reach
        # the last query result
        while cur.nextset():
            pass  # This loop is empty

        self.row_count = cur.get_rowcount()
        if cur.get_rowcount() > 0:
            res = cur.fetchone()
            if len(res) > 0:
                return True, res[0]

        return True, None

    def release_async_cursor(self):
        if self.__async_cursor and not self.__async_cursor.closed:
            try:
                self.__async_cursor.close_cursor()
            except Exception as e:
                print("EXception==", str(e))

    def execute_async(self, query, params=None, formatted_exception_msg=True,
                      server_cursor=False):
        """
        This function executes the given query asynchronously and returns
        result.

        Args:
            query: SQL query to run.
            params: extra parameters to the function
            formatted_exception_msg: if True then function return the
            formatted exception message
        """
        self.__async_cursor = None
        self.__async_query_error = None

        status, cur = self.__cursor(scrollable=True,
                                    server_cursor=server_cursor)

        if not status:
            return False, str(cur)
        query_id = str(secrets.choice(range(1, 9999999)))

        encoding = self.python_encoding

        query = query.encode(encoding)
        self.__async_cursor = cur
        self.__async_query_id = query_id

        current_app.logger.log(
            25,
            "Execute (async) by {pga_user} on {db_user}@{db_host}/{db_name} "
            "#{server_id} - {conn_id} (Query-id: "
            "{query_id}):\n{query}".format(
                pga_user=current_user.username,
                db_user=self.conn.info.user,
                db_host=self.conn.info.host,
                db_name=self.conn.info.dbname,
                server_id=self.manager.sid,
                conn_id=self.conn_id,
                query=query.decode(encoding),
                query_id=query_id
            )
        )

        try:
            self.__notices = []
            self.__notifies = []
            self.execution_aborted = False
            cur.execute(query, params)
        except psycopg.Error as pe:
            errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
            current_app.logger.error(
                "Failed to execute query (execute_async) for the server "
                "#{server_id} - {conn_id}(Query-id: {query_id}):\n"
                "Error Message:{errmsg}".format(
                    server_id=self.manager.sid,
                    conn_id=self.conn_id,
                    errmsg=errmsg,
                    query_id=query_id
                )
            )
            self.__async_query_error = errmsg

            if self.conn and self.conn.closed or self.is_disconnected(pe):
                raise ConnectionLost(
                    self.manager.sid,
                    self.db,
                    None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
                )

            return False, errmsg

        return True, None

    def execute_void(self, query, params=None, formatted_exception_msg=False):
        """
        This function executes the given query with no result.

        Args:
            query: SQL query to run.
            params: extra parameters to the function
            formatted_exception_msg: if True then function return the
            formatted exception message
        """
        status, cur = self.__cursor()

        if not status:
            return False, str(cur)
        query_id = str(secrets.choice(range(1, 9999999)))

        current_app.logger.log(
            25,
            "Execute (void) by {pga_user} on "
            "{db_user}@{db_host}/{db_name} #{server_id} - "
            "{conn_id} (Query-id: {query_id}):\n{query}".format(
                pga_user=current_user.email,
                db_user=self.conn.info.user,
                db_host=self.conn.info.host,
                db_name=self.conn.info.dbname,
                server_id=self.manager.sid,
                conn_id=self.conn_id,
                query=query,
                query_id=query_id
            )
        )

        try:
            self.__internal_blocking_execute(cur, query, params)
        except psycopg.Error as pe:
            cur.close_cursor()
            if not self.connected():
                if self.auto_reconnect and not self.reconnecting:
                    return self.__attempt_execution_reconnect(
                        self.execute_void, query, params,
                        formatted_exception_msg
                    )
                raise ConnectionLost(
                    self.manager.sid,
                    self.db,
                    None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
                )
            errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
            current_app.logger.error(
                "Failed to execute query (execute_void) for the server "
                "#{server_id} - {conn_id}(Query-id: {query_id}):\n"
                "Error Message:{errmsg}".format(
                    server_id=self.manager.sid,
                    conn_id=self.conn_id,
                    errmsg=errmsg,
                    query_id=query_id
                )
            )
            return False, errmsg

        return True, None

    def __attempt_execution_reconnect(self, fn, *args, **kwargs):
        self.reconnecting = True
        setattr(g, self.ARGS_STR.format(
            self.manager.sid,
            self.conn_id.encode('utf-8')
        ), None)
        try:
            status, res = self.connect()
            if status:
                if fn:
                    status, res = fn(*args, **kwargs)
                    self.reconnecting = False
                return status, res
        except Exception as e:
            current_app.logger.exception(e)
            self.reconnecting = False

            current_app.logger.warning(
                "Failed to reconnect the database server "
                "(Server #{server_id}, Connection #{conn_id})".format(
                    server_id=self.manager.sid,
                    conn_id=self.conn_id
                )
            )
        self.reconnecting = False
        raise ConnectionLost(
            self.manager.sid,
            self.db,
            None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
        )

    def execute_2darray(self, query, params=None,
                        formatted_exception_msg=False):
        status, cur = self.__cursor()
        self.row_count = 0

        if not status:
            return False, str(cur)

        query_id = str(secrets.choice(range(1, 9999999)))
        current_app.logger.log(
            25,
            "Execute (2darray) by {pga_user} on "
            "{db_user}@{db_host}/{db_name} #{server_id} - "
            "{conn_id} (Query-id: {query_id}):\n{query}".format(
                pga_user=current_user.email,
                db_user=self.conn.info.user,
                db_host=self.conn.info.host,
                db_name=self.conn.info.dbname,
                server_id=self.manager.sid,
                conn_id=self.conn_id,
                query=query,
                query_id=query_id
            )
        )
        try:
            self.__internal_blocking_execute(cur, query, params)
        except psycopg.Error as pe:
            cur.close_cursor()
            if not self.connected() and self.auto_reconnect and \
                    not self.reconnecting:
                return self.__attempt_execution_reconnect(
                    self.execute_2darray, query, params,
                    formatted_exception_msg
                )
            errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
            current_app.logger.error(
                "Failed to execute query (execute_2darray) for the server "
                "#{server_id} - {conn_id} (Query-id: {query_id}):\n"
                "Error Message:{errmsg}".format(
                    server_id=self.manager.sid,
                    conn_id=self.conn_id,
                    errmsg=errmsg,
                    query_id=query_id
                )
            )
            return False, errmsg

        # Get Resultset Column Name, Type and size
        columns = cur.description and [
            desc.to_dict() for desc in cur.ordered_description()
        ] or []

        rows = []
        self.row_count = cur.get_rowcount()
        if cur.get_rowcount() > 0:
            rows = cur.fetchall()

        return True, {'columns': columns, 'rows': rows}

    def execute_dict(self, query, params=None, formatted_exception_msg=False):
        status, cur = self.__cursor()
        self.row_count = 0

        if not status:
            return False, str(cur)
        query_id = str(secrets.choice(range(1, 9999999)))
        current_app.logger.log(
            25,
            "Execute (dict) by {pga_user} on "
            "{db_user}@{db_host}/{db_name} #{server_id} - "
            "{conn_id} (Query-id: {query_id}):\n{query}".format(
                pga_user=current_user.email,
                db_user=self.conn.info.user,
                db_host=self.conn.info.host,
                db_name=self.conn.info.dbname,
                server_id=self.manager.sid,
                conn_id=self.conn_id,
                query=query,
                query_id=query_id
            )
        )
        try:
            self.__internal_blocking_execute(cur, query, params)
        except psycopg.Error as pe:
            cur.close_cursor()
            if not self.connected():
                if self.auto_reconnect and not self.reconnecting:
                    return self.__attempt_execution_reconnect(
                        self.execute_dict, query, params,
                        formatted_exception_msg
                    )
                raise ConnectionLost(
                    self.manager.sid,
                    self.db,
                    None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
                )
            errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
            current_app.logger.error(
                "Failed to execute query (execute_dict) for the server "
                "#{server_id}- {conn_id} (Query-id: {query_id}):\n"
                "Error Message:{errmsg}".format(
                    server_id=self.manager.sid,
                    conn_id=self.conn_id,
                    query_id=query_id,
                    errmsg=errmsg
                )
            )
            return False, errmsg

        # Get Resultset Column Name, Type and size
        columns = cur.description and [
            desc.to_dict() for desc in cur.ordered_description()
        ] or []

        rows = []
        self.row_count = cur.rowcount

        # If multiple queries are run, make sure to reach
        # the last query result
        while cur.nextset():
            pass  # This loop is empty

        if cur.get_rowcount() > 0:
            rows = cur.fetchall()

        return True, {'columns': columns, 'rows': rows}

    def async_fetchmany_2darray(self, records=2000,
                                from_rownum=0, to_rownum=0,
                                formatted_exception_msg=False):
        """
        User should poll and check if status is ASYNC_OK before calling this
        function
        Args:
          records: no of records to fetch. use -1 to fetchall.
          formatted_exception_msg:
          for_download: if True, will fetch all records and reset the cursor

        Returns:

        """
        cur = self.__async_cursor
        if not cur:
            return False, self.CURSOR_NOT_FOUND

        if not self.conn:
            raise ConnectionLost(
                self.manager.sid,
                self.db,
                None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
            )
        if self.conn.pgconn.is_busy():
            return False, gettext(
                "Asynchronous query execution/operation underway."
            )

        more_results = True
        while more_results:
            if cur.get_rowcount() > 0:
                result = []
                try:
                    if records == -1:
                        result = cur.fetchwindow(
                            from_rownum=0, to_rownum=cur.get_rowcount() - 1,
                            _tupples=True)
                    elif records is None:
                        result = cur.fetchwindow(from_rownum=from_rownum,
                                                 to_rownum=to_rownum,
                                                 _tupples=True)
                    else:
                        result = cur.fetchmany(records, _tupples=True)
                except psycopg.ProgrammingError:
                    result = None
            else:
                # User performed operation which dose not produce record/s as
                # result.
                # for eg. DDL operations.
                return True, None
            more_results = cur.nextset()

        return True, result

    def connected(self):
        if self.conn:
            if not self.conn.closed:
                return True
            self.conn = None
        return False

    def _decrypt_password(self, manager):
        """
        Decrypt password
        :param manager: Manager for get password.
        :return:
        """
        password = getattr(manager, 'password', None)
        if password:
            # Fetch Logged in User Details.
            user = User.query.filter_by(id=current_user.id).first()

            if user is None:
                return False, self.UNAUTHORIZED_REQUEST, password

            crypt_key_present, crypt_key = get_crypt_key()
            if not crypt_key_present:
                return False, crypt_key, password

            password = decrypt(password, crypt_key).decode()
        return True, '', password

    def reset(self):
        if self.conn and self.conn.closed:
            self.conn = None
        pg_conn = None
        manager = self.manager

        is_return, return_value, password = self._decrypt_password(manager)
        if is_return:
            return False, return_value

        try:
            with ConnectionLocker(manager.kerberos_conn):
                # Create the connection string
                connection_string = manager.create_connection_string(
                    self.db, manager.user, password)

                pg_conn = psycopg.connect(connection_string,
                                          cursor_factory=DictCursor)
        except psycopg.Error as e:
            if hasattr(e, 'pgerror'):
                msg = e.pgerror
            elif hasattr(e, 'message'):
                msg = e.message
            elif e.diag.message_detail:
                msg = e.diag.message_detail
            else:
                msg = str(e)

            current_app.logger.error(
                gettext(
                    """
Failed to reset the connection to the server due to following error:
{0}"""
                ).Format(msg)
            )
            return False, msg

        pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH)
        self.conn = pg_conn
        self.__backend_pid = pg_conn.info.backend_pid

        return True, None

    def transaction_status(self):
        if self.conn and self.conn.info:
            return self.conn.info.transaction_status
        return None

    def async_query_error(self):
        return self.__async_query_error

    def ping(self):
        return self.execute_scalar('SELECT 1')

    def _release(self):
        if self.wasConnected:
            if self.conn:
                if self.async_ == 0:
                    self.conn.close()
                elif self.async_ == 1:
                    self._close_async()
                self.conn = None
            self.password = None
            self.wasConnected = False

    def _close_async(self):
        async def _close_conn(conn):
            if conn:
                await conn.close()
        asyncio.run(_close_conn(self.conn))

    def _wait(self, conn):
        pass  # This function is empty

    def _wait_timeout(self, conn, time):
        pass  # This function is empty

    def poll(self, formatted_exception_msg=False, no_result=False):
        cur = self.__async_cursor

        if self.conn and self.conn.info.transaction_status == 1:
            status = 3
        elif self.__async_query_error:
            return False, self.__async_query_error
        elif self.conn and self.conn.pgconn.error_message:
            return False, self.conn.pgconn.error_message
        else:
            status = 1

        if not cur or cur.closed:
            return False, self.CURSOR_NOT_FOUND

        result = None
        self.row_count = 0
        self.column_info = None

        current_app.logger.log(
            25,
            "Polling result for (Query-id: {query_id})".format(
                query_id=self.__async_query_id
            )
        )
        more_result = True
        while more_result:
            if self.conn:
                if cur.description is not None:
                    self.column_info = [desc.to_dict() for
                                        desc in cur.ordered_description()]

                    pos = 0
                    if self.column_info:
                        for col in self.column_info:
                            col['pos'] = pos
                            pos += 1
                else:
                    self.column_info = None
                self.row_count = cur.get_rowcount()
                if not no_result and cur.get_rowcount() > 0:
                    result = []
                    try:
                        result = cur.fetchall(_tupples=True)
                    except psycopg.ProgrammingError:
                        result = None
                    except psycopg.Error:
                        result = None
            more_result = cur.nextset()

        return status, result

    def status_message(self):
        """
        This function will return the status message returned by the last
        command executed on the server.
        """
        cur = self.__async_cursor
        if not cur:
            return self.CURSOR_NOT_FOUND

        current_app.logger.log(
            25,
            "Status message for (Query-id: {query_id})".format(
                query_id=self.__async_query_id
            )
        )

        return cur.statusmessage

    def rows_affected(self):
        """
        This function will return the no of rows affected by the last command
        executed on the server.
        """

        return self.row_count

    @property
    def total_rows(self):
        if self.__async_cursor is None:
            return 0
        return self.__async_cursor.rowcount

    def get_column_info(self):
        """
        This function will returns list of columns for last async sql command
        executed on the server.
        """

        return self.column_info

    def cancel_transaction(self, conn_id, did=None):
        """
        This function is used to cancel the running transaction
        of the given connection id and database id using
        PostgreSQL's pg_cancel_backend.

        Args:
            conn_id: Connection id
            did: Database id (optional)
        """
        cancel_conn = self.manager.connection(did=did, conn_id=conn_id)
        query = """SELECT pg_cancel_backend({0});""".format(
            cancel_conn.__backend_pid)

        status = True
        msg = ''

        # if backend pid is same then create a new connection
        # to cancel the query and release it.
        if cancel_conn.__backend_pid == self.__backend_pid:
            password = getattr(self.manager, 'password', None)
            if password:
                # Fetch Logged in User Details.
                user = User.query.filter_by(id=current_user.id).first()
                if user is None:
                    return False, self.UNAUTHORIZED_REQUEST

                crypt_key_present, crypt_key = get_crypt_key()
                if not crypt_key_present:
                    return False, crypt_key
                password = decrypt(password, crypt_key).decode()

            try:
                with ConnectionLocker(self.manager.kerberos_conn):
                    connection_string = self.manager.create_connection_string(
                        self.db, self.manager.user, password)

                    pg_conn = psycopg.connect(connection_string,
                                              cursor_factory=DictCursor)

                # Get the cursor and run the query
                cur = pg_conn.cursor()
                cur.execute(query)

                # Close the connection
                pg_conn.close()

            except psycopg.Error as e:
                status = False
                if hasattr(e, 'pgerror'):
                    msg = e.pgerror
                elif e.diag.message_detail:
                    msg = e.diag.message_detail
                else:
                    msg = str(e)
                return status, msg
        else:
            if self.connected():
                status, msg = self.execute_void(query)

                if status:
                    cancel_conn.execution_aborted = True
            else:
                status = False
                msg = gettext("Not connected to the database server.")

        return status, msg

    def messages(self):
        """
        Returns the list of the messages/notices send from the database server.
        """
        resp = []

        if self.__notices is not None:
            while self.__notices:
                resp.append(self.__notices.pop(0))

        if self.__notifies is None:
            return resp

        for notify in self.__notifies:
            if notify.payload is not None and notify.payload != '':
                notify_msg = gettext(
                    "Asynchronous notification \"{0}\" with payload \"{1}\" "
                    "received from server process with PID {2}\n"
                ).format(notify.channel, notify.payload, notify.pid)

            else:
                notify_msg = gettext(
                    "Asynchronous notification \"{0}\" received from "
                    "server process with PID {1}\n"
                ).format(notify.channel, notify.pid)
            resp.append(notify_msg)

        return resp

    def _formatted_exception_msg(self, exception_obj, formatted_msg):
        """
        This method is used to parse the psycopg.Error object and returns the
        formatted error message if flag is set to true else return
        normal error message.

        Args:
            exception_obj: exception object
            formatted_msg: if True then function return the formatted exception
            message

        """
        if hasattr(exception_obj, 'pgerror'):
            errmsg = exception_obj.pgerror
        elif hasattr(exception_obj, 'diag') and \
            hasattr(exception_obj.diag, 'message_detail') and\
                exception_obj.diag.message_detail is not None:
            errmsg = exception_obj.diag.message_primary + '\n' + \
                exception_obj.diag.message_detail
        else:
            errmsg = str(exception_obj)

        # if formatted_msg is false then return from the function
        if not formatted_msg:
            notices = self.get_notices()
            return errmsg if notices == '' else notices + '\n' + errmsg

        # Do not append if error starts with `ERROR:` as most pg related
        # error starts with `ERROR:`
        if not errmsg.startswith('ERROR:'):
            errmsg = gettext('ERROR:  ') + errmsg + ' \n\n'

        if exception_obj.diag.severity is not None \
                and exception_obj.diag.message_primary is not None:
            ex_diag_message = "{0}:  {1}".format(
                exception_obj.diag.severity,
                exception_obj.diag.message_primary
            )
            # If both errors are different then only append it
            if errmsg and ex_diag_message and \
                ex_diag_message.strip().strip('\n').lower() not in \
                    errmsg.strip().strip('\n').lower():
                errmsg += ex_diag_message
        elif exception_obj.diag.message_primary is not None:
            message_primary = exception_obj.diag.message_primary
            if message_primary.lower() not in errmsg.lower():
                errmsg += message_primary

        if exception_obj.diag.sqlstate is not None:
            if not errmsg.endswith('\n'):
                errmsg += '\n'
            errmsg += gettext('SQL state: ')
            errmsg += exception_obj.diag.sqlstate

        if exception_obj.diag.message_detail is not None and \
                'Detail:'.lower() not in errmsg.lower():
            if not errmsg.endswith('\n'):
                errmsg += '\n'
            errmsg += gettext('Detail: ')
            errmsg += exception_obj.diag.message_detail

        if exception_obj.diag.message_hint is not None and \
                'Hint:'.lower() not in errmsg.lower():
            if not errmsg.endswith('\n'):
                errmsg += '\n'
            errmsg += gettext('Hint: ')
            errmsg += exception_obj.diag.message_hint

        if exception_obj.diag.statement_position is not None and \
                'Character:'.lower() not in errmsg.lower():
            if not errmsg.endswith('\n'):
                errmsg += '\n'
            errmsg += gettext('Character: ')
            errmsg += exception_obj.diag.statement_position

        if exception_obj.diag.context is not None and \
                'Context:'.lower() not in errmsg.lower():
            if not errmsg.endswith('\n'):
                errmsg += '\n'
            errmsg += gettext('Context: ')
            errmsg += exception_obj.diag.context

        notices = self.get_notices()
        return errmsg if notices == '' else notices + '\n' + errmsg

    #####
    # As per issue reported on pgsycopg2 github repository link is shared below
    # conn.closed is not reliable enough to identify the disconnection from the
    # database server for some unknown reasons.
    #
    # (https://github.com/psycopg/psycopg2/issues/263)
    #
    # In order to resolve the issue, sqlalchamey follows the below logic to
    # identify the disconnection. It relies on exception message to identify
    # the error.
    #
    # Reference (MIT license):
    # https://github.com/zzzeek/sqlalchemy/blob/master/lib/sqlalchemy/dialects/postgresql/psycopg2.py
    #
    def is_disconnected(self, err):
        if self.conn and not self.conn.closed:
            # checks based on strings.  in the case that .closed
            # didn't cut it, fall back onto these.
            str_e = str(err).partition("\n")[0]
            for msg in [
                # these error messages from libpq: interfaces/libpq/fe-misc.c
                # and interfaces/libpq/fe-secure.c.
                'terminating connection',
                'closed the connection',
                'connection not open',
                'could not receive data from server',
                'could not send data to server',
                'connection already closed',
                'cursor already closed',
                # not sure where this path is originally from, it may
                # be obsolete.   It really says "losed", not "closed".
                'losed the connection unexpectedly',
                # these can occur in newer SSL
                'connection has been closed unexpectedly',
                'SSL SYSCALL error: Bad file descriptor',
                'SSL SYSCALL error: EOF detected',
                'terminating connection due to administrator command'
            ]:
                idx = str_e.find(msg)
                if idx >= 0 and '"' not in str_e[:idx]:
                    return True

            return False
        return True

    def check_notifies(self, n=None):
        """
        Check for the notify messages by polling the connection or after
        execute is there in notifies.
        """

        if n:
            if self.__notifies is None:
                self.__notifies = []
            self.__notifies.append(n)

    def get_notifies(self):
        """
        This function will returns list of notifies received from database
        server.
        """
        notifies = None
        # Convert list of Notify objects into list of Dict.
        if self.__notifies is not None and len(self.__notifies) > 0:
            notifies = [{'recorded_time': str(datetime.datetime.now()),
                         'channel': notify.channel,
                         'payload': notify.payload,
                         'pid': notify.pid
                         } for notify in self.__notifies
                        ]
            self.__notifies = None
        return notifies

    def get_notices(self, diag=None):
        """
        This function will returns the notices as string.
        :return:
        """
        notices = ''
        # Check for notices.
        if diag and hasattr(diag, 'message_primary'):
            if self.__notices is None:
                self.__notices = []
            self.__notices.append(f"{diag.severity}:"
                                  f"  {diag.message_primary}\n")

        if diag is None:
            while self.__notices:
                notices += self.__notices.pop(0)

        return notices

    def pq_encrypt_password_conn(self, password, user):
        """
        This function will return the encrypted password for database server
        :param password: password to be encrypted
        :param user: user of the database server
        :return:
        """
        enc_password = None
        if self.connected():
            status, enc_algorithm = \
                self.execute_scalar("SHOW password_encryption")
            if status:
                encoding = self.conn.info.encoding
                enc_password = self.conn.pgconn.encrypt_password(
                    password.encode(encoding), user.encode(encoding),
                    enc_algorithm.encode(encoding)
                ).decode()

        return enc_password

    def mogrify(self, query, parameters):
        """
        This function will return the sql query after parameters binding
        :param query: sql query before parameters (variables) binding
        :param parameters: query parameters / variables
        :return:
        """
        status, _ = self.__cursor()
        if not status:
            return None
        else:

            if parameters:
                with psycopg.ClientCursor(self.conn) as _cur:
                    return _cur.mogrify(query, parameters)
            else:
                return query
